package com.lmm.cg.zjgcg.task;

import android.util.Log;

import io.reactivex.Observable;
import io.reactivex.ObservableEmitter;
import io.reactivex.ObservableOnSubscribe;
import io.reactivex.android.schedulers.AndroidSchedulers;
import io.reactivex.functions.Action;
import io.reactivex.functions.Consumer;
import io.reactivex.schedulers.Schedulers;

public abstract class BaseTask<M> {

    TaskCallBack callBack;
    private final String TAG = getClass().getName();

    public BaseTask(TaskCallBack callBack) {
        this.callBack = callBack;
    }

    public void run() {

        Observable<M> observable = Observable.create(new ObservableOnSubscribe<M>() {
            @Override
            public void subscribe(ObservableEmitter<M> emitter) throws Exception {
                exec(emitter);
            }
        });

        Consumer<M> consumer = new Consumer<M>() {
            @Override
            public void accept(M integer) throws Exception {
                Log.d(TAG, "Observer thread is :" + Thread.currentThread().getName());
                Log.d(TAG, "onNext: " + integer);

                callBack.onSuccess(integer);
            }
        };

        Consumer<Throwable> onErrorConsumer = new Consumer<Throwable>() {
            @Override
            public void accept(Throwable throwable) {
                Log.i(TAG, "error" + throwable.getMessage());
                callBack.onFail(throwable.getMessage());
            }
        };


        Action onCompleteAction = new Action() {
            @Override
            public void run() throws Exception {
                Log.i(TAG, "complete");
                callBack.onFinish();
            }
        };

        observable.subscribeOn(Schedulers.newThread())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(consumer, onErrorConsumer, onCompleteAction);
    }

    public abstract void exec(ObservableEmitter emitter);

}
